package fun.ticsmyc.rpc.client.transport.netty;

import fun.ticsmyc.rpc.client.util.RpcMessageChecker;
import fun.ticsmyc.rpc.common.entity.RpcRequest;
import fun.ticsmyc.rpc.common.entity.RpcResponse;
import fun.ticsmyc.rpc.common.factory.SingletonFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;

/**
 * 每个RpcRequestSender保存着一个channel ，用于向这个channel发送请求
 * 并通过completableFutureHelper接收响应
 * channel与RpcRequestSender绑定，每个远程地址都有一个唯一的RpcRequestSender
 * @author Ticsmyc
 * @date 2020-10-29 21:04
 */
public class RpcRequestSender {

    private static final Logger logger = LoggerFactory.getLogger(RpcRequestSender.class);

    private static final CompletableFutureHelper completableFutureHelper = SingletonFactory.getSingletonInstance(CompletableFutureHelper.class);

    private Channel channel;

    public RpcRequestSender(Bootstrap bootstrap,InetSocketAddress inetSocketAddress){
        this(bootstrap,inetSocketAddress.getHostName(),inetSocketAddress.getPort());
    }
    public RpcRequestSender(Bootstrap bootstrap, String host, int port){
        InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
        try {
            ChannelFuture sync = bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    logger.debug("客户端连接到服务器{} : {}", inetSocketAddress.getHostString(), inetSocketAddress.getPort());
                } else {
                    logger.debug("连接服务器 {} 失败", inetSocketAddress);
                }
            }).sync();
            this.channel = sync.channel();
        } catch (InterruptedException e) {
            logger.error("未知错误",e);
        }
    }

    public Object sendAndGetReturnValue(RpcRequest rpcRequest){
        try {
            //提前创建好 用于接收该请求响应的future对象
            CompletableFuture<RpcResponse> rpcResponseCompletableFuture = new CompletableFuture<>();
            completableFutureHelper.put(rpcRequest.getRequestId(),rpcResponseCompletableFuture);
            //发送rpc请求
            ChannelFuture responseFuture = this.channel
                    .writeAndFlush(rpcRequest)
                    .addListener((ChannelFutureListener) future->{
                        if(future .isSuccess()){
                            logger.debug("客户端消息发送成功");
                        }else{
                            future.channel().close();
                            logger.error("发送消息失败:",future.cause());
                            completableFutureHelper.remove(rpcRequest.getRequestId());
                            //这里直接抛异常
                            rpcResponseCompletableFuture.completeExceptionally(future.cause());
                        }
                    });
            //在这里要拿到rpc请求返回的结果
            RpcResponse rpcResponse = rpcResponseCompletableFuture.get();
            RpcMessageChecker.check(rpcRequest,rpcResponse);
            return rpcResponse.getData();
        } catch (Exception e) {
            logger.error("发送消息时发生错误:",e);
            completableFutureHelper.remove(rpcRequest.getRequestId());
            e.printStackTrace();
            return null;
        }
    }

    public boolean isAlive(){
        return channel != null && channel.isActive();
    }

}
